Logo
Published on

2.1.Flask 增删改查

Authors
  • avatar
    Name
    xiaobai
    Twitter

1.学习目标与总体设计

  • 通过 Flask 与 SQLAlchemy 构建一套标准的用户管理接口,实现创建、读取、更新与删除(CRUD)。
  • 内容包含:项目结构、数据库配置、模型定义、迁移、RESTful 路由、请求校验、事务处理、分页/过滤/排序、错误与返回格式、性能优化建议。

2.环境与依赖

安装依赖:

pip install flask flask-sqlalchemy pymysql marshmallow
# 数据迁移工具(可选但推荐)
pip install alembic

3.项目结构建议

app/
  __init__.py        # 应用工厂 & 扩展初始化
  config.py          # 配置管理(环境变量)
  extensions.py      # 数据库等扩展实例(db = SQLAlchemy()  models.py          # SQLAlchemy 模型定义
  schemas.py         # 请求/响应校验与序列化(Marshmallow)
  routes.py          # 路由(增删改查,实现 REST 接口)
wsgi.py              # 入口(create_app 并注册蓝图)
alembic/             # 迁移目录(由 alembic init 初始化)

4.配置与应用工厂

# app/config.py
import os

class Config:
    SQLALCHEMY_DATABASE_URI = os.getenv(
        'DATABASE_URL', 'mysql+pymysql://user:pwd@127.0.0.1:3306/demo?charset=utf8mb4'
    )
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JSON_SORT_KEYS = False  # 保留字段顺序,响应更易读

# app/extensions.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

# app/__init__.py
from flask import Flask
from .config import Config
from .extensions import db

def create_app(config_class: type = Config):
    app = Flask(__name__)
    app.config.from_object(config_class)
    db.init_app(app)
    return app

wsgi.py 中注册路由蓝图:

# wsgi.py
from app import create_app
from app.routes import api

app = create_app()
app.register_blueprint(api)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

5.模型与迁移

5.1.模型定义

# app/models.py
from datetime import datetime
from .extensions import db

class User(db.Model):
    __tablename__ = 'user'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(50), nullable=False, unique=True, index=True)
    email = db.Column(db.String(100), nullable=False, unique=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)

5.2.Alembic 迁移(推荐)

初始化与生成迁移(示例流程):

alembic init alembic
# 编辑 alembic.ini 与 alembic/env.py,指向应用的数据库 URL
alembic revision -m "create user table"
alembic upgrade head

开发阶段也可临时使用 db.create_all() 初始化(生产环境请使用迁移)。

6.请求校验与序列化(Marshmallow)

# app/schemas.py
from marshmallow import Schema, fields, validate

class UserCreateSchema(Schema):
    username = fields.Str(required=True, validate=validate.Length(min=3, max=50))
    email = fields.Email(required=True)

class UserUpdateSchema(Schema):
    username = fields.Str(validate=validate.Length(min=3, max=50))
    email = fields.Email()

class UserOutSchema(Schema):
    id = fields.Int()
    username = fields.Str()
    email = fields.Email()
    created_at = fields.DateTime()
    updated_at = fields.DateTime()

user_create_schema = UserCreateSchema()
user_update_schema = UserUpdateSchema()
user_out_schema = UserOutSchema()
users_out_schema = UserOutSchema(many=True)

7.RESTful 路由(CRUD 实现)

# app/routes.py
from flask import Blueprint, request, jsonify
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from .extensions import db
from .models import User
from .schemas import (
    user_create_schema,
    user_update_schema,
    user_out_schema,
    users_out_schema,
)

api = Blueprint('api', __name__, url_prefix='/api')

def ok(data=None, status=200):
    return jsonify({'code': 0, 'message': 'ok', 'data': data}), status

def err(message, status=400, code=1):
    return jsonify({'code': code, 'message': message, 'data': None}), status

@api.get('/users')
def list_users():
    # 分页、过滤、排序
    page = int(request.args.get('page', 1))
    page_size = min(int(request.args.get('page_size', 10)), 100)
    q = request.args.get('q')  # 按用户名搜索
    sort = request.args.get('sort', 'created_at')  # 排序字段
    order = request.args.get('order', 'desc')      # 排序方向

    query = User.query
    if q:
        query = query.filter(User.username.like(f'%{q}%'))

    # 简单排序控制
    sort_col = getattr(User, sort, User.created_at)
    query = query.order_by(sort_col.desc() if order == 'desc' else sort_col.asc())

    # 统计总数并分页
    total = query.count()
    items = query.offset((page - 1) * page_size).limit(page_size).all()

    return ok({'items': users_out_schema.dump(items), 'page': page, 'page_size': page_size, 'total': total})

@api.get('/users/<int:user_id>')
def get_user(user_id):
    user = User.query.get(user_id)
    if not user:
        return err('user not found', status=404)
    return ok(user_out_schema.dump(user))

@api.post('/users')
def create_user():
    # 请求体验证
    json_data = request.get_json(silent=True) or {}
    errors = user_create_schema.validate(json_data)
    if errors:
        return err(errors, status=422)

    user = User(username=json_data['username'], email=json_data['email'])
    try:
        db.session.add(user)
        db.session.commit()
    except IntegrityError:
        db.session.rollback()
        return err('username or email already exists', status=409)
    except Exception as e:
        db.session.rollback()
        return err(f'create failed: {str(e)}', status=500)

    return ok({'id': user.id}, status=201)

@api.put('/users/<int:user_id>')
@api.patch('/users/<int:user_id>')
def update_user(user_id):
    user = User.query.get(user_id)
    if not user:
        return err('user not found', status=404)

    json_data = request.get_json(silent=True) or {}
    errors = user_update_schema.validate(json_data)
    if errors:
        return err(errors, status=422)

    # 局部更新
    user.username = json_data.get('username', user.username)
    user.email = json_data.get('email', user.email)

    try:
        db.session.commit()
    except IntegrityError:
        db.session.rollback()
        return err('username or email already exists', status=409)
    except Exception as e:
        db.session.rollback()
        return err(f'update failed: {str(e)}', status=500)

    return ok(user_out_schema.dump(user))

@api.delete('/users/<int:user_id>')
def delete_user(user_id):
    user = User.query.get(user_id)
    if not user:
        return err('user not found', status=404)
    try:
        db.session.delete(user)
        db.session.commit()
    except Exception as e:
        db.session.rollback()
        return err(f'delete failed: {str(e)}', status=500)
    return ok({'deleted': user_id})

8.统一错误处理与返回格式

建议通过 @app.errorhandler 统一处理常见错误,并保持返回结构一致:

from flask import jsonify

def register_error_handlers(app):
    @app.errorhandler(404)
    def not_found(e):
        return jsonify({'code': 1, 'message': 'not found', 'data': None}), 404

    @app.errorhandler(500)
    def server_error(e):
        return jsonify({'code': 1, 'message': 'server error', 'data': None}), 500

9.分页、过滤、排序的设计要点

  • 分页:page/page_size,限制最大 page_size 防止过大查询。
  • 过滤:对字符串模糊匹配使用索引列谨慎设计;必要时考虑前缀搜索或额外搜索服务。
  • 排序:仅允许白名单字段;避免任意列排序导致慢查询或安全问题。
  • 性能:
    • 大数据集避免 count() 的频繁调用,可延迟或提供近似总数;
    • 使用合适索引;
    • 对高并发写操作增加唯一约束与幂等策略。

10.安全与事务

  • 使用 ORM 自动参数绑定,避免 SQL 注入。
  • 对写操作包裹事务并在异常时 rollback();在业务层面制定冲突与重试策略(如 409)。

11.curl 测试示例

# 创建
curl -X POST http://127.0.0.1:5000/api/users \
  -H 'Content-Type: application/json' \
  -d '{"username":"alice","email":"alice@test.com"}'

# 分页查询
curl 'http://127.0.0.1:5000/api/users?page=1&page_size=10&q=ali&sort=created_at&order=desc'

# 详情
curl http://127.0.0.1:5000/api/users/1

# 更新
curl -X PATCH http://127.0.0.1:5000/api/users/1 \
  -H 'Content-Type: application/json' \
  -d '{"email":"alice@new.com"}'

# 删除
curl -X DELETE http://127.0.0.1:5000/api/users/1

12.扩展与进阶

  • 引入 Alembic 管理迁移与版本;
  • 引入日志、统一响应中间件、鉴权(见《Flask 接口认证》);
  • 对 N+1 查询使用 selectinload 等关系加载优化;
  • 对海量分页考虑 keyset pagination(基于主键游标)。

13.总结

  • 通过 Flask + SQLAlchemy + Marshmallow 构建一套规范的 CRUD;
  • 结合迁移、错误处理、分页过滤排序与性能优化,满足生产需求。